package com.econ.springboot.kafka.demo.kafka;

import com.alibaba.fastjson.JSON;
import com.econ.springboot.kafka.demo.dao.AnalogDataMapper;
import com.econ.springboot.kafka.demo.domain.AnalogData;
import com.econ.springboot.kafka.demo.service.AnalogDataService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author jwc
 * @date 2022/10/26
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConsumer {
    private final AnalogDataService analogDataService;
    private final AnalogDataMapper analogDataMapper;

    /**
     * 消费，插入mongodb
     * @param record
     */
    /*@KafkaListener(groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = "0")
    })*/
    /*public void listener(ConsumerRecord<String,String> record){
        //获取消息
        String message = record.value();
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        //analogDataService.createMongoDBIndex("analog_data");
        analogDataService.insert(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入mongo数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }*/

    /**
     * 消费，插入ClickHouse
     * @param record
     */
    @KafkaListener(id="consumer1", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"0"})
    })
    public void listenerClickHouse1(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }

    /**
     * 消费，插入ClickHouse
     * @param record
     */
    @KafkaListener(id="consumer2", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"1"})
    })
    public void listenerClickHouse2(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }

    @KafkaListener(id="consumer3", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"2"})
    })
    public void listenerClickHouse3(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }

    @KafkaListener(id="consumer4", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"3"})
    })
    public void listenerClickHouse4(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }

    @KafkaListener(id="consumer5", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"4"})
    })
    public void listenerClickHouse5(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }

    @KafkaListener(id="consumer6", groupId = "test-group-jwc", topicPartitions = {
            @TopicPartition(topic = "test-jwc5", partitions = {"5"})
    })
    public void listenerClickHouse6(ConsumerRecord<String,String> record){
        //消息偏移量
        long offset = record.offset();
        log.info("消费消息，当前偏移量：{}", offset);
        List<AnalogData> analogDatas = JSON.parseArray(record.value(), AnalogData.class);
        long start = System.currentTimeMillis();
        analogDataMapper.insertBatch(analogDatas);
        long end = System.currentTimeMillis();
        log.info("插入ClickHouse数据数量：{}，耗时：{}", analogDatas.size(), (end - start));
    }
}
